This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7b58fffdeeb [SPARK-46100][CORE][PYTHON] Reduce stack depth by replace (string|array).size with (string|array).length 7b58fffdeeb is described below commit 7b58fffdeeb70524e18ad80ea0aa53e2ac910e2a Author: Jiaan Geng <belie...@163.com> AuthorDate: Sat Nov 25 14:38:34 2023 -0600 [SPARK-46100][CORE][PYTHON] Reduce stack depth by replace (string|array).size with (string|array).length ### What changes were proposed in this pull request? There are a lot of `[string|array].size` called. In fact, the size calls the underlying length, this behavior increase the stack length. We should call `[string|array].length` directly. We also get the compile waring `Replace .size with .length on arrays and strings` This PR just improve the core module. ### Why are the changes needed? Reduce stack depth by replace (string|array).size with (string|array).length ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44011 from beliefer/SPARK-46100. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../org/apache/spark/api/python/PythonRunner.scala | 2 +- .../apache/spark/deploy/master/ui/MasterPage.scala | 4 +- .../apache/spark/executor/ExecutorMetrics.scala | 2 +- .../org/apache/spark/resource/ResourceUtils.scala | 2 +- .../apache/spark/scheduler/TaskDescription.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 4 +- .../org/apache/spark/ui/ConsoleProgressBar.scala | 2 +- .../org/apache/spark/util/HadoopFSUtils.scala | 2 +- .../util/io/ChunkedByteBufferFileRegion.scala | 2 +- .../scala/org/apache/spark/CheckpointSuite.scala | 16 ++--- .../scala/org/apache/spark/DistributedSuite.scala | 16 ++--- .../test/scala/org/apache/spark/FileSuite.scala | 2 +- .../org/apache/spark/MapOutputTrackerSuite.scala | 4 +- .../scala/org/apache/spark/PartitioningSuite.scala | 4 +- .../test/scala/org/apache/spark/ShuffleSuite.scala | 2 +- .../spark/deploy/DecommissionWorkerSuite.scala | 2 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 4 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 22 +++--- .../spark/deploy/client/AppClientSuite.scala | 6 +- .../deploy/history/FsHistoryProviderSuite.scala | 20 +++--- .../deploy/rest/StandaloneRestSubmitSuite.scala | 2 +- .../input/WholeTextFileRecordReaderSuite.scala | 4 +- .../internal/plugin/PluginContainerSuite.scala | 4 +- .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 2 +- .../apache/spark/rdd/LocalCheckpointSuite.scala | 2 +- .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 44 ++++++------ .../scala/org/apache/spark/rdd/PipedRDDSuite.scala | 10 +-- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 80 +++++++++++----------- .../scala/org/apache/spark/rdd/SortingSuite.scala | 6 +- .../apache/spark/rdd/ZippedPartitionsSuite.scala | 4 +- .../spark/resource/ResourceProfileSuite.scala | 2 +- .../apache/spark/resource/ResourceUtilsSuite.scala | 6 +- .../apache/spark/scheduler/AQEShuffledRDD.scala | 2 +- .../CoarseGrainedSchedulerBackendSuite.scala | 2 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 32 ++++----- .../apache/spark/scheduler/MapStatusSuite.scala | 2 +- .../scheduler/OutputCommitCoordinatorSuite.scala | 8 +-- .../spark/scheduler/TaskSchedulerImplSuite.scala | 12 ++-- .../spark/scheduler/TaskSetManagerSuite.scala | 4 +- .../KryoSerializerDistributedSuite.scala | 2 +- .../sort/IndexShuffleBlockResolverSuite.scala | 2 +- .../org/apache/spark/storage/DiskStoreSuite.scala | 2 +- .../org/apache/spark/util/FileAppenderSuite.scala | 4 +- .../spark/util/collection/SizeTrackerSuite.scala | 2 +- 44 files changed, 180 insertions(+), 180 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index d6363182606..e6d5a750ea3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -378,7 +378,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( resources.foreach { case (k, v) => PythonRDD.writeUTF(k, dataOut) PythonRDD.writeUTF(v.name, dataOut) - dataOut.writeInt(v.addresses.size) + dataOut.writeInt(v.addresses.length) v.addresses.foreach { case addr => PythonRDD.writeUTF(addr, dataOut) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index cb325b37958..b2f35984d37 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -83,13 +83,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { .flatMap(_.iterator) .groupBy(_._1) // group by resource name .map { case (rName, rInfoArr) => - rName -> rInfoArr.map(_._2.addresses.size).sum + rName -> rInfoArr.map(_._2.addresses.length).sum } val usedInfo = aliveWorkers.map(_.resourcesInfoUsed) .flatMap(_.iterator) .groupBy(_._1) // group by resource name .map { case (rName, rInfoArr) => - rName -> rInfoArr.map(_._2.addresses.size).sum + rName -> rInfoArr.map(_._2.addresses.length).sum } formatResourcesUsed(totalInfo, usedInfo) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala index 486e5965221..8c474e9b76c 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -46,7 +46,7 @@ class ExecutorMetrics private[spark] extends Serializable { private[spark] def this(metrics: Array[Long]) = { this() - Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.length, this.metrics.length)) } private[spark] def this(metrics: AtomicLongArray) = { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 9080be01a9e..00c655f4a4f 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -303,7 +303,7 @@ private[spark] object ResourceUtils extends Logging { allocations: Map[String, ResourceInformation], execReqs: Map[String, ExecutorResourceRequest]): Unit = { execReqs.foreach { case (rName, req) => - require(allocations.contains(rName) && allocations(rName).addresses.size >= req.amount, + require(allocations.contains(rName) && allocations(rName).addresses.length >= req.amount, s"Resource: ${rName}, with addresses: " + s"${allocations(rName).addresses.mkString(",")} " + s"is less than what the user requested: ${req.amount})") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 6e6507782a4..75032086ead 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -80,7 +80,7 @@ private[spark] object TaskDescription { map.foreach { case (key, value) => dataOut.writeUTF(key) dataOut.writeUTF(value.name) - dataOut.writeInt(value.addresses.size) + dataOut.writeInt(value.addresses.length) value.addresses.foreach(dataOut.writeUTF(_)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 41f6b3ad64b..15ae2fef221 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -434,7 +434,7 @@ private[spark] class TaskSchedulerImpl( // addresses are the same as that we allocated in taskResourceAssignments since it's // synchronized. We don't remove the exact addresses allocated because the current // approach produces the identical result with less time complexity. - availableResources(i)(rName).remove(0, rInfo.addresses.size) + availableResources(i)(rName).remove(0, rInfo.addresses.length) } } } catch { @@ -752,7 +752,7 @@ private[spark] class TaskSchedulerImpl( .mkString(",") addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) - logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " + + logInfo(s"Successfully scheduled all the ${addressesWithDescs.length} tasks for " + s"barrier stage ${taskSet.stageId}.") } taskSet.barrierPendingLaunchTasks.clear() diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala index dff94b4e875..b5473e07694 100644 --- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -74,7 +74,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { * the progress bar, then progress bar will be showed in next line without overwrite logs. */ private def show(now: Long, stages: Seq[StageData]): Unit = { - val width = TerminalWidth / stages.size + val width = TerminalWidth / stages.length val bar = stages.map { s => val total = s.numTasks val header = s"[Stage ${s.stageId}:" diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 3245a528b74..4c7b12f60cc 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -245,7 +245,7 @@ private[spark] object HadoopFSUtils extends Logging { val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val filteredNestedFiles: Seq[FileStatus] = contextOpt match { - case Some(context) if dirs.size > parallelismThreshold => + case Some(context) if dirs.length > parallelismThreshold => parallelListLeafFilesInternal( context, dirs.map(_.getPath).toImmutableArraySeq, diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala index 23fc0f88f0b..ec74ce0473e 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala @@ -69,7 +69,7 @@ private[io] class ChunkedByteBufferFileRegion( if (keepGoing) { // advance to the next chunk (if there are any more) currentChunkIdx += 1 - if (currentChunkIdx == chunks.size) { + if (currentChunkIdx == chunks.length) { keepGoing = false } else { currentChunk = chunks(currentChunkIdx) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index c425596eb04..874f4896bb0 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -170,10 +170,10 @@ trait RDDCheckpointTester { self: SparkFunSuite => * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint. */ private def getSerializedSizes(rdd: RDD[_]): (Int, Int) = { - val rddSize = Utils.serialize(rdd).size - val rddCpDataSize = Utils.serialize(rdd.checkpointData).size - val rddPartitionSize = Utils.serialize(rdd.partitions).size - val rddDependenciesSize = Utils.serialize(rdd.dependencies).size + val rddSize = Utils.serialize(rdd).length + val rddCpDataSize = Utils.serialize(rdd.checkpointData).length + val rddPartitionSize = Utils.serialize(rdd.partitions).length + val rddDependenciesSize = Utils.serialize(rdd.dependencies).length // Print detailed size, helps in debugging logInfo("Serialized sizes of " + rdd + @@ -339,7 +339,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS runTest("ParallelCollectionRDD") { reliableCheckpoint: Boolean => val parCollection = sc.makeRDD(1 to 4, 2) - val numPartitions = parCollection.partitions.size + val numPartitions = parCollection.partitions.length checkpoint(parCollection, reliableCheckpoint) assert(parCollection.dependencies === Nil) val result = parCollection.collect() @@ -358,7 +358,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS val blockManager = SparkEnv.get.blockManager blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY) val blockRDD = new BlockRDD[String](sc, Array(blockId)) - val numPartitions = blockRDD.partitions.size + val numPartitions = blockRDD.partitions.length checkpoint(blockRDD, reliableCheckpoint) val result = blockRDD.collect() if (reliableCheckpoint) { @@ -507,7 +507,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean => val rdd = new BlockRDD[Int](sc, Array.empty[BlockId]) - assert(rdd.partitions.size === 0) + assert(rdd.partitions.length === 0) assert(rdd.isCheckpointed === false) assert(rdd.isCheckpointedAndMaterialized === false) checkpoint(rdd, reliableCheckpoint) @@ -516,7 +516,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(rdd.count() === 0) assert(rdd.isCheckpointed) assert(rdd.isCheckpointedAndMaterialized) - assert(rdd.partitions.size === 0) + assert(rdd.partitions.length === 0) } runTest("checkpointAllMarkedAncestors") { reliableCheckpoint: Boolean => diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index e156533be15..a2b09f0ef3c 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -80,7 +80,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex sc = new SparkContext(clusterUrl, "test") val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)), 5) val groups = pairs.groupByKey(5).collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -264,8 +264,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(Seq(true, true), 2) assert(data.count() === 2) // force executors to start - assert(data.map(markNodeIfIdentity).collect().size === 2) - assert(data.map(failOnMarkedIdentity).collect().size === 2) + assert(data.map(markNodeIfIdentity).collect().length === 2) + assert(data.map(failOnMarkedIdentity).collect().length === 2) } test("recover from repeated node failures during shuffle-map") { @@ -275,7 +275,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex for (i <- 1 to 3) { val data = sc.parallelize(Seq(true, false), 2) assert(data.count() === 2) - assert(data.map(markNodeIfIdentity).collect().size === 2) + assert(data.map(markNodeIfIdentity).collect().length === 2) assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey().count() === 2) } } @@ -287,7 +287,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex for (i <- 1 to 3) { val data = sc.parallelize(Seq(true, true), 2) assert(data.count() === 2) - assert(data.map(markNodeIfIdentity).collect().size === 2) + assert(data.map(markNodeIfIdentity).collect().length === 2) // This relies on mergeCombiners being used to perform the actual reduce for this // test to actually be testing what it claims. val grouped = data.map(x => x -> x).combineByKey( @@ -295,7 +295,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex (x: Boolean, y: Boolean) => x, (x: Boolean, y: Boolean) => failOnMarkedIdentity(x) ) - assert(grouped.collect().size === 1) + assert(grouped.collect().length === 1) } } @@ -310,8 +310,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex data.persist(StorageLevel.MEMORY_ONLY_2) assert(data.count() === 4) - assert(data.map(markNodeIfIdentity).collect().size === 4) - assert(data.map(failOnMarkedIdentity).collect().size === 4) + assert(data.map(markNodeIfIdentity).collect().length === 4) + assert(data.map(failOnMarkedIdentity).collect().length === 4) // Create a new replicated RDD to make sure that cached peer information doesn't cause // problems. diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 4a2b2339159..7750db60208 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -236,7 +236,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Try reading the output back as an object file val ct = reflect.ClassTag[Any](Utils.classForName(className, noSparkClassLoader = true)) val output = sc.objectFile[Any](outputDir) - assert(output.collect().size === 3) + assert(output.collect().length === 3) assert(output.collect().head.getClass.getName === className) } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index dde30aee828..5d635011d2e 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -237,13 +237,13 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { // as it has 4 out of 7 bytes of output. val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5) assert(topLocs50.nonEmpty) - assert(topLocs50.get.size === 1) + assert(topLocs50.get.length === 1) assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000)) // When the threshold is 20%, both hosts should be returned as preferred locations. val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2) assert(topLocs20.nonEmpty) - assert(topLocs20.get.size === 2) + assert(topLocs20.get.length === 2) assert(topLocs20.get.toSet === Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet) diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 28fa9f5e23e..3447ba8c176 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -77,7 +77,7 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva for (element <- 1 to 1000) { val partition = partitioner.getPartition(element) if (numPartitions > 1) { - if (partition < rangeBounds.size) { + if (partition < rangeBounds.length) { assert(element <= rangeBounds(partition)) } if (partition > 0) { @@ -111,7 +111,7 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva assert(count === rdd.count()) sketched.foreach { case (idx, n, sample) => assert(n === idx) - assert(sample.size === math.min(n, sampleSizePerPartition)) + assert(sample.length === math.min(n, sampleSizePerPartition)) } } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index a92d532907a..ac10a00d98e 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -51,7 +51,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalRootDi sc = new SparkContext("local", "test", myConf) val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)), 4) val groups = pairs.groupByKey(4).collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index 3b3bcff0c5a..20993df718a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -439,7 +439,7 @@ class DecommissionWorkerSuite val appId = sc.applicationId eventually(timeout(1.minute), interval(1.seconds)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.getExecutorLimit === Int.MaxValue) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index a032e9aa16b..553d001285b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1736,7 +1736,7 @@ object SimpleApplicationTest { .map(x => SparkEnv.get.conf.get(config)) .collect() .distinct - if (executorValues.size != 1) { + if (executorValues.length != 1) { throw new SparkException(s"Inconsistent values for $config: " + s"${executorValues.mkString("values(", ", ", ")")}") } @@ -1795,7 +1795,7 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { class TestSparkApplication extends SparkApplication with Matchers { override def start(args: Array[String], conf: SparkConf): Unit = { - assert(args.size === 1) + assert(args.length === 1) assert(args(0) === "hello") assert(conf.get("spark.test.hello") === "world") assert(sys.props.get("spark.test.hello") === None) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 01995ca3632..5ecc551c16b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -69,7 +69,7 @@ class StandaloneDynamicAllocationSuite workers = makeWorkers(10, 2048) // Wait until all workers register with master successfully eventually(timeout(1.minute), interval(10.milliseconds)) { - assert(getMasterState.workers.size === numWorkers) + assert(getMasterState.workers.length === numWorkers) } } @@ -93,7 +93,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -140,7 +140,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) @@ -195,7 +195,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) @@ -248,7 +248,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 10) // 20 cores total assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -302,7 +302,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 4) // 8 cores total assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -360,7 +360,7 @@ class StandaloneDynamicAllocationSuite sc.requestExecutors(2) eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === 2) @@ -385,7 +385,7 @@ class StandaloneDynamicAllocationSuite sc.requestExecutors(2) eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === 2) @@ -425,7 +425,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -465,7 +465,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === initialExecutorLimit) assert(apps.head.getExecutorLimit === initialExecutorLimit) @@ -477,7 +477,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index d109ed8442d..3555faf5c2c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -71,7 +71,7 @@ class AppClientSuite workers = makeWorkers(10, 2048) // Wait until all workers register with master successfully eventually(timeout(1.minute), interval(10.milliseconds)) { - assert(getMasterState.workers.size === numWorkers) + assert(getMasterState.workers.length === numWorkers) } } @@ -99,7 +99,7 @@ class AppClientSuite eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") - assert(apps.size === 1, "master should have 1 registered app") + assert(apps.length === 1, "master should have 1 registered app") } // Send message to Master to request Executors, verify request by change in executor limit @@ -176,7 +176,7 @@ class AppClientSuite eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") - assert(apps.size === 1, "master should have 1 registered app") + assert(apps.length === 1, "master should have 1 registered app") } // Send message to Master to request Executors with multiple resource profiles. diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d16e904bdcf..3013a5bf4a2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1113,13 +1113,13 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === logCount) + assert(new File(testDir.toURI).listFiles().length === logCount) // Move the clock forward 1 day and scan the files again. They should still be there. clock.advance(TimeUnit.DAYS.toMillis(1)) provider.checkForLogs() provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === logCount) + assert(new File(testDir.toURI).listFiles().length === logCount) // Update the slow app to contain valid info. Code should detect the change and not clean // it up. @@ -1133,7 +1133,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P clock.advance(TimeUnit.DAYS.toMillis(2)) provider.checkForLogs() provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === validLogCount) + assert(new File(testDir.toURI).listFiles().length === validLogCount) } test("always find end event for finished apps") { @@ -1414,12 +1414,12 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() // The invalid application log file would be cleaned by checkAndCleanLog(). - assert(new File(testDir.toURI).listFiles().size === 1) + assert(new File(testDir.toURI).listFiles().length === 1) clock.advance(1) // cleanLogs() would clean the valid application log file. provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === 0) + assert(new File(testDir.toURI).listFiles().length === 0) } private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = { @@ -1556,7 +1556,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) provider.checkForLogs() provider.cleanLogs() - assert(dir.listFiles().size === 1) + assert(dir.listFiles().length === 1) assert(provider.getListing().length === 1) // Manually delete the appstatus file to make an invalid rolling event log @@ -1578,7 +1578,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() provider.cleanLogs() assert(provider.getListing().length === 1) - assert(dir.listFiles().size === 2) + assert(dir.listFiles().length === 2) // Make sure a new provider sees the valid application provider.stop() @@ -1615,7 +1615,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // The 1st checkForLogs should scan/update app2 only since it is newer than app1 provider.checkForLogs() assert(provider.getListing().length === 1) - assert(dir.listFiles().size === 2) + assert(dir.listFiles().length === 2) assert(provider.getListing().map(e => e.id).contains("app2")) assert(!provider.getListing().map(e => e.id).contains("app1")) @@ -1630,7 +1630,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // The 2nd checkForLogs should scan/update app3 only since it is newer than app1 provider.checkForLogs() assert(provider.getListing().length === 2) - assert(dir.listFiles().size === 3) + assert(dir.listFiles().length === 3) assert(provider.getListing().map(e => e.id).contains("app3")) assert(!provider.getListing().map(e => e.id).contains("app1")) @@ -1655,7 +1655,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) provider.checkForLogs() provider.cleanLogs() - assert(dir.listFiles().size === 1) + assert(dir.listFiles().length === 1) assert(provider.getListing().length === 1) // Manually delete event log files and create event log file reader diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 2f645e69079..abe05a80558 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -289,7 +289,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite { val statusRequestPath = s"$httpUrl/$v/submissions/status" val goodJson = constructSubmitRequest(masterUrl).toJson val badJson1 = goodJson.replaceAll("action", "fraction") // invalid JSON - val badJson2 = goodJson.substring(goodJson.size / 2) // malformed JSON + val badJson2 = goodJson.substring(goodJson.length / 2) // malformed JSON val notJson = "\"hello, world\"" val (response1, code1) = sendHttpRequestWithResponse(submitRequestPath, "POST") // missing JSON val (response2, code2) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson1) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index e64ebe2a551..0fc0b753606 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -97,7 +97,7 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite { val res = sc.wholeTextFiles(dir.toString, 3).collect() - assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, + assert(res.length === WholeTextFileRecordReaderSuite.fileNames.length, "Number of files read out does not fit with the actual value.") for ((filename, contents) <- res) { @@ -120,7 +120,7 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite { val res = sc.wholeTextFiles(dir.toString, 3).collect() - assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, + assert(res.length === WholeTextFileRecordReaderSuite.fileNames.length, "Number of files read out does not fit with the actual value.") for ((filename, contents) <- res) { diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index ef214bd50d9..95b484d7176 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -214,11 +214,11 @@ class PluginContainerSuite extends SparkFunSuite with LocalSparkContext { } val execFiles = children.filter(_.getName.startsWith(NonLocalModeSparkPlugin.executorFileStr)) - assert(execFiles.size === 1) + assert(execFiles.length === 1) val allLines = Files.readLines(execFiles(0), StandardCharsets.UTF_8) assert(allLines.size === 1) val addrs = NonLocalModeSparkPlugin.extractGpuAddrs(allLines.get(0)) - assert(addrs.size === 2) + assert(addrs.length === 2) assert(addrs.sorted === Array("3", "4")) assert(NonLocalModeSparkPlugin.driverContext != null) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 56783de1c13..4239180ba6c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -91,7 +91,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with TimeLimits { val expected = input.take(num) val saw = rdd.takeAsync(num).get() assert(saw == expected, "incorrect result for rdd with %d partitions (expected %s, saw %s)" - .format(rdd.partitions.size, expected, saw)) + .format(rdd.partitions.length, expected, saw)) } val input = Range(1, 1000) diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala index f644fee74a1..591b8b4c0df 100644 --- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala @@ -159,7 +159,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext { test("missing checkpoint block fails with informative message") { val rdd = newRdd.localCheckpoint() - val numPartitions = rdd.partitions.size + val numPartitions = rdd.partitions.length val partitionIndices = rdd.partitions.map(_.index) val bmm = sc.env.blockManager.master diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 9b60d2eeeed..e436d988434 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -41,7 +41,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val pairs = sc.parallelize(Seq((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2) val sets = pairs.aggregateByKey(new HashSet[Int]())(_ += _, _ ++= _).collect() - assert(sets.size === 3) + assert(sets.length === 3) val valuesFor1 = sets.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1)) val valuesFor3 = sets.find(_._1 == 3).get._2 @@ -53,7 +53,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey") { val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey().collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -63,7 +63,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey with duplicates") { val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val groups = pairs.groupByKey().collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -73,7 +73,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey with negative key hash codes") { val pairs = sc.parallelize(Seq((-1, 1), (-1, 2), (-1, 3), (2, 1))) val groups = pairs.groupByKey().collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesForMinus1 = groups.find(_._1 == -1).get._2 assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -83,7 +83,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey with many output partitions") { val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey(10).collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -249,7 +249,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) + assert(joined.length === 4) assert(joined.toSet === Set( (1, (1, 'x')), (1, (2, 'x')), @@ -262,7 +262,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (1, 3))) val rdd2 = sc.parallelize(Seq((1, 'x'), (1, 'y'))) val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) + assert(joined.length === 6) assert(joined.toSet === Set( (1, (1, 'x')), (1, (1, 'y')), @@ -277,7 +277,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) + assert(joined.length === 5) assert(joined.toSet === Set( (1, (1, Some('x'))), (1, (2, Some('x'))), @@ -296,7 +296,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT) val joined = rdd1.cogroup(rdd2).collect() - assert(joined.size > 0) + assert(joined.length > 0) } // See SPARK-9326 @@ -307,7 +307,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5) val joined = rdd1.cogroup(rdd2).collect() - assert(joined.size > 0) + assert(joined.length > 0) } // See SPARK-22465 @@ -377,7 +377,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) + assert(joined.length === 5) assert(joined.toSet === Set( (1, (Some(1), 'x')), (1, (Some(2), 'x')), @@ -391,7 +391,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.fullOuterJoin(rdd2).collect() - assert(joined.size === 6) + assert(joined.length === 6) assert(joined.toSet === Set( (1, (Some(1), Some('x'))), (1, (Some(2), Some('x'))), @@ -406,14 +406,14 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) + assert(joined.length === 0) } test("join with many output partitions") { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.join(rdd2, 10).collect() - assert(joined.size === 4) + assert(joined.length === 4) assert(joined.toSet === Set( (1, (1, 'x')), (1, (2, 'x')), @@ -426,7 +426,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.groupWith(rdd2).collect() - assert(joined.size === 4) + assert(joined.length === 4) val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet assert(joinedSet === Set( (1, (List(1, 2), List('x'))), @@ -441,7 +441,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) val joined = rdd1.groupWith(rdd2, rdd3).collect() - assert(joined.size === 4) + assert(joined.length === 4) val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList, x._2._3.toList))).toSet assert(joinedSet === Set( @@ -458,7 +458,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) val rdd4 = sc.parallelize(Seq((2, '@'))) val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect() - assert(joined.size === 4) + assert(joined.length === 4) val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList, x._2._3.toList, x._2._4.toList))).toSet assert(joinedSet === Set( @@ -492,14 +492,14 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val b = a.map(a => (a, (a * 2).toString)) // then a group by, and see we didn't revert to 2 partitions val c = b.groupByKey() - assert(c.partitions.size === 2000) + assert(c.partitions.length === 2000) } test("default partitioner uses largest partitioner") { val a = sc.makeRDD(Seq((1, "a"), (2, "b")), 2) val b = sc.makeRDD(Seq((1, "a"), (2, "b")), 2000) val c = a.join(b) - assert(c.partitions.size === 2000) + assert(c.partitions.length === 2000) } test("subtract") { @@ -507,7 +507,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val b = sc.parallelize(Array(2, 3, 4).toImmutableArraySeq, 4) val c = a.subtract(b) assert(c.collect().toSet === Set(1)) - assert(c.partitions.size === a.partitions.size) + assert(c.partitions.length === a.partitions.length) } test("subtract with narrow dependency") { @@ -531,7 +531,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val b = sc.parallelize(Seq((2, 20), (3, 30), (4, 40)), 4) val c = a.subtractByKey(b) assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitions.size === a.partitions.size) + assert(c.partitions.length === a.partitions.length) } test("subtractByKey with narrow dependency") { @@ -795,7 +795,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assertBinomialSample(exact = exact, actual = v.toInt, trials = trials(k).toInt, p = samplingRate) } - assert(takeSample.size === takeSample.toSet.size) + assert(takeSample.length === takeSample.toSet.size) takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 3a097e5335a..7f12d8b624c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -47,7 +47,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall val piped = nums.pipe(Seq("cat")) val c = piped.collect() - assert(c.size === 4) + assert(c.length === 4) assert(c(0) === "1") assert(c(1) === "2") assert(c(2) === "3") @@ -61,7 +61,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) { val c = piped.collect() - assert(c.size === 2) + assert(c.length === 2) assert(c(0).trim === "2") assert(c(1).trim === "2") } @@ -129,7 +129,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall val c = piped.collect() - assert(c.size === 8) + assert(c.length === 8) assert(c(0) === "0") assert(c(1) === "\u0001") assert(c(2) === "1_") @@ -151,7 +151,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall f(e + "_") } }).collect() - assert(d.size === 8) + assert(d.length === 8) assert(d(0) === "0") assert(d(1) === "\u0001") assert(d(2) === "b\t2_") @@ -216,7 +216,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall val nums = sc.makeRDD(Array(1, 2, 3, 4).toImmutableArraySeq, 2) val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) val c = piped.collect() - assert(c.size === 4) + assert(c.length === 4) assert(c(0) === "1") assert(c(1) === "2") assert(c(2) === "3") diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 32ba2053258..706ebfa9364 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -322,7 +322,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("empty RDD") { val empty = new EmptyRDD[Int](sc) assert(empty.count() === 0) - assert(empty.collect().size === 0) + assert(empty.collect().length === 0) val thrown = intercept[UnsupportedOperationException]{ empty.reduce(_ + _) @@ -331,12 +331,12 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val emptyKv = new EmptyRDD[(Int, Int)](sc) val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x)) - assert(rdd.join(emptyKv).collect().size === 0) - assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) - assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) - assert(rdd.fullOuterJoin(emptyKv).collect().size === 2) - assert(rdd.cogroup(emptyKv).collect().size === 2) - assert(rdd.union(emptyKv).collect().size === 2) + assert(rdd.join(emptyKv).collect().length === 0) + assert(rdd.rightOuterJoin(emptyKv).collect().length === 0) + assert(rdd.leftOuterJoin(emptyKv).collect().length === 2) + assert(rdd.fullOuterJoin(emptyKv).collect().length === 2) + assert(rdd.cogroup(emptyKv).collect().length === 2) + assert(rdd.union(emptyKv).collect().length === 2) } test("repartitioned RDDs") { @@ -348,7 +348,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { // Coalesce partitions val repartitioned1 = data.repartition(2) - assert(repartitioned1.partitions.size == 2) + assert(repartitioned1.partitions.length == 2) val partitions1 = repartitioned1.glom().collect() assert(partitions1(0).length > 0) assert(partitions1(1).length > 0) @@ -356,7 +356,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { // Split partitions val repartitioned2 = data.repartition(20) - assert(repartitioned2.partitions.size == 20) + assert(repartitioned2.partitions.length == 20) val partitions2 = repartitioned2.glom().collect() assert(partitions2(0).length > 0) assert(partitions2(19).length > 0) @@ -370,7 +370,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val data = sc.parallelize(input.toImmutableArraySeq, initialPartitions) val repartitioned1 = data.repartition(2) - assert(repartitioned1.partitions.size == 2) + assert(repartitioned1.partitions.length == 2) val partitions1 = repartitioned1.glom().collect() // some noise in balancing is allowed due to randomization assert(math.abs(partitions1(0).length - 500) < initialPartitions) @@ -380,7 +380,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int): Unit = { val data = sc.parallelize(input, initialPartitions) val repartitioned = data.repartition(finalPartitions) - assert(repartitioned.partitions.size === finalPartitions) + assert(repartitioned.partitions.length === finalPartitions) val partitions = repartitioned.glom().collect() // assert all elements are present assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq) @@ -441,7 +441,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { // when shuffling, we can increase the number of partitions val coalesced6 = data.coalesce(20, shuffle = true) - assert(coalesced6.partitions.size === 20) + assert(coalesced6.partitions.length === 20) assert(coalesced6.collect().toSet === (1 to 10).toSet) } @@ -564,13 +564,13 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val coalesced2 = data2.coalesce(partitions) // test that we have 10000 partitions - assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " + - coalesced2.partitions.size) + assert(coalesced2.partitions.length == 10000, "Expected 10000 partitions, but got " + + coalesced2.partitions.length) // test that we have 100 partitions val coalesced3 = data2.coalesce(numMachines * 2) - assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " + - coalesced3.partitions.size) + assert(coalesced3.partitions.length == 100, "Expected 100 partitions, but got " + + coalesced3.partitions.length) // test that the groups are load balanced with 100 +/- 20 elements in each val maxImbalance3 = coalesced3.partitions @@ -613,9 +613,9 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val data = sc.parallelize(1 to 10, 10) // Note that split number starts from 0, so > 8 means only 10th partition left. val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) - assert(prunedRdd.partitions.size === 1) + assert(prunedRdd.partitions.length === 1) val prunedData = prunedRdd.collect() - assert(prunedData.size === 1) + assert(prunedData.length === 1) assert(prunedData(0) === 10) } @@ -626,7 +626,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("take") { var nums = sc.makeRDD(Range(1, 1000), 1) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -635,7 +635,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.makeRDD(Range(1, 1000), 2) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -644,7 +644,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.makeRDD(Range(1, 1000), 100) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -653,7 +653,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.makeRDD(Range(1, 1000), 1000) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -662,7 +662,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.parallelize(1 to 2, 2) - assert(nums.take(2147483638).size === 2) + assert(nums.take(2147483638).length === 2) assert(nums.takeAsync(2147483638).get().size === 2) } @@ -670,7 +670,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val nums = Seq.range(1, 100000) val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) val topK = ints.top(5) - assert(topK.size === 5) + assert(topK.length === 5) assert(topK === nums.reverse.take(5)) } @@ -679,7 +679,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { implicit val ord = implicitly[Ordering[String]].reverse val rdd = sc.makeRDD(words, 2) val topK = rdd.top(2) - assert(topK.size === 2) + assert(topK.length === 2) assert(topK.sorted === Array("b", "a")) } @@ -687,7 +687,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val rdd = sc.makeRDD(nums.toImmutableArraySeq, 2) val sortedLowerK = rdd.takeOrdered(5) - assert(sortedLowerK.size === 5) + assert(sortedLowerK.length === 5) assert(sortedLowerK === Array(1, 2, 3, 4, 5)) } @@ -695,7 +695,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val rdd = sc.makeRDD(nums.toImmutableArraySeq, 2) val sortedLowerK = rdd.takeOrdered(0) - assert(sortedLowerK.size === 0) + assert(sortedLowerK.length === 0) } test("SPARK-40276: takeOrdered with empty RDDs") { @@ -708,7 +708,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { implicit val ord = implicitly[Ordering[Int]].reverse val rdd = sc.makeRDD(nums.toImmutableArraySeq, 2) val sortedTopK = rdd.takeOrdered(5) - assert(sortedTopK.size === 5) + assert(sortedTopK.length === 5) assert(sortedTopK === Array(10, 9, 8, 7, 6)) assert(sortedTopK === nums.sorted(ord).take(5)) } @@ -736,48 +736,48 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { for (num <- List(5, 20, 100)) { val sample = data.takeSample(withReplacement = false, num = num) - assert(sample.size === num) // Got exactly num elements + assert(sample.length === num) // Got exactly num elements assert(sample.toSet.size === num) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = false, 20, seed) - assert(sample.size === 20) // Got exactly 20 elements + assert(sample.length === 20) // Got exactly 20 elements assert(sample.toSet.size === 20) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = false, 100, seed) - assert(sample.size === 100) // Got only 100 elements + assert(sample.length === 100) // Got only 100 elements assert(sample.toSet.size === 100) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = true, 20, seed) - assert(sample.size === 20) // Got exactly 20 elements + assert(sample.length === 20) // Got exactly 20 elements assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } { val sample = data.takeSample(withReplacement = true, num = 20) - assert(sample.size === 20) // Got exactly 20 elements + assert(sample.length === 20) // Got exactly 20 elements assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } { val sample = data.takeSample(withReplacement = true, num = n) - assert(sample.size === n) // Got exactly n elements + assert(sample.length === n) // Got exactly n elements // Chance of getting all distinct elements is astronomically low, so test we got < n assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = true, n, seed) - assert(sample.size === n) // Got exactly n elements + assert(sample.length === n) // Got exactly n elements // Chance of getting all distinct elements is astronomically low, so test we got < n assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = true, 2 * n, seed) - assert(sample.size === 2 * n) // Got exactly 2 * n elements + assert(sample.length === 2 * n) // Got exactly 2 * n elements // Chance of getting all distinct elements is still quite low, so test we got < n assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") } @@ -794,7 +794,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val data = sc.parallelize(1 to n, 2) for(seed <- 1 to 5) { val splits = data.randomSplit(Array(1.0, 2.0, 3.0), seed) - assert(splits.size == 3, "wrong number of splits") + assert(splits.length == 3, "wrong number of splits") assert(splits.flatMap(_.collect()).sorted.toList == data.collect().toList, "incomplete or wrong split") val s = splits.map(_.count()) @@ -1179,7 +1179,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) val coalescedHadoopRDD = hadoopRDD.coalesce(2, partitionCoalescer = Option(new SizeBasedCoalescer(maxSplitSize))) - assert(coalescedHadoopRDD.partitions.size <= 10) + assert(coalescedHadoopRDD.partitions.length <= 10) var totalPartitionCount = 0L coalescedHadoopRDD.partitions.foreach(partition => { var splitSizeSum = 0L @@ -1256,7 +1256,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { .map(coalescedRDD.getPreferredLocations(_).head) .groupBy(identity) .view - .mapValues(_.size) + .mapValues(_.length) // Make sure the coalesced partitions are distributed fairly evenly between the two locations. // This should not become flaky since the DefaultPartitionsCoalescer uses a fixed seed. @@ -1357,7 +1357,7 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria totalSum += splitSize } - while (index < partitions.size) { + while (index < partitions.length) { val partition = partitions(index) val fileSplit = partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit] diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index 802889b0477..5771e99b64c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -35,7 +35,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers { val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr.toImmutableArraySeq, 2) val sorted = pairs.sortByKey() - assert(sorted.partitions.size === 2) + assert(sorted.partitions.length === 2) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -44,7 +44,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers { val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr.toImmutableArraySeq, 2) val sorted = pairs.sortByKey(true, 1) - assert(sorted.partitions.size === 1) + assert(sorted.partitions.length === 1) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -53,7 +53,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers { val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr.toImmutableArraySeq, 2) val sorted = pairs.sortByKey(true, 20) - assert(sorted.partitions.size === 20) + assert(sorted.partitions.length === 20) assert(sorted.collect() === pairArr.sortBy(_._1)) } diff --git a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala index 7079b9ea8ea..c04719eb9ea 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.{SharedSparkContext, SparkFunSuite} object ZippedPartitionsSuite { def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { - Iterator(i.toArray.size, s.toArray.size, d.toArray.size) + Iterator(i.toArray.length, s.toArray.length, d.toArray.length) } } @@ -35,7 +35,7 @@ class ZippedPartitionsSuite extends SparkFunSuite with SharedSparkContext { val obtainedSizes = zippedRDD.collect() val expectedSizes = Array(2, 3, 1, 2, 3, 1) - assert(obtainedSizes.size == 6) + assert(obtainedSizes.length == 6) assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2)) } } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index fd7018f189e..be38315cd75 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -374,7 +374,7 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { rprof.require(eReq) // Update this if new resource type added - assert(ResourceProfile.allSupportedExecutorResources.size === 5, + assert(ResourceProfile.allSupportedExecutorResources.length === 5, "Executor resources should have 5 supported resources") assert(rprof.build().getCustomExecutorResources().size === 1, "Executor resources should have 1 custom resource") diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index 1ab9f7c5d2b..20d6cc76715 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -101,13 +101,13 @@ class ResourceUtilsSuite extends SparkFunSuite val gpuValue = resources.get(GPU) assert(gpuValue.nonEmpty, "Should have a gpu entry") assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.length == 2, "Should have 2 indexes") assert(gpuValue.get.addresses.sameElements(Array("0", "1")), "should have 0,1 entries") val fpgaValue = resources.get(FPGA) assert(fpgaValue.nonEmpty, "Should have a gpu entry") assert(fpgaValue.get.name == "fpga", "name should be fpga") - assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes") + assert(fpgaValue.get.addresses.length == 3, "Should have 3 indexes") assert(fpgaValue.get.addresses.sameElements(Array("f1", "f2", "f3")), "should have f1,f2,f3 entries") } @@ -231,7 +231,7 @@ class ResourceUtilsSuite extends SparkFunSuite val gpuValue = resources.get(GPU) assert(gpuValue.nonEmpty, "Should have a gpu entry") assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.length == 2, "Should have 2 indexes") assert(gpuValue.get.addresses.sameElements(Array("0", "1")), "should have 0,1 entries") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala index 3f8eaede6e7..84f9ef0d557 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala @@ -48,7 +48,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A result } - override def numPartitions: Int = partitionStartIndices.size + override def numPartitions: Int = partitionStartIndices.length override def getPartition(key: Any): Int = { parentPartitionMapping(parent.getPartition(key)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index bf5e9d96cd8..e9b8ae4bffe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -62,7 +62,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo } assert(thrown.getMessage.contains("using broadcast variables for large values")) val smaller = sc.parallelize(1 to 4).collect() - assert(smaller.size === 4) + assert(smaller.length === 4) } test("compute max number of concurrent tasks can be launched") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0f7146bc7c1..c55f627075e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -462,9 +462,9 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ private def complete(taskSet: TaskSet, taskEndInfos: Seq[(TaskEndReason, Any)]): Unit = { - assert(taskSet.tasks.size >= taskEndInfos.size) + assert(taskSet.tasks.length >= taskEndInfos.size) for ((result, i) <- taskEndInfos.zipWithIndex) { - if (i < taskSet.tasks.size) { + if (i < taskSet.tasks.length) { runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2)) } } @@ -474,9 +474,9 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti accumId: Long, taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]): Unit = { - assert(taskSet.tasks.size >= results.size) + assert(taskSet.tasks.length >= results.size) for ((result, i) <- results.zipWithIndex) { - if (i < taskSet.tasks.size) { + if (i < taskSet.tasks.length) { runEvent(makeCompletionEvent( taskSet.tasks(i), result._1, @@ -1671,21 +1671,21 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(makeCompletionEvent( taskSet.tasks(0), Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) + makeMapStatus("hostA", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 0) // should work because it's a non-failed host (so the available map outputs will increase) runEvent(makeCompletionEvent( taskSet.tasks(0), Success, - makeMapStatus("hostB", reduceRdd.partitions.size))) + makeMapStatus("hostB", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 1) // should be ignored for being too old runEvent(makeCompletionEvent( taskSet.tasks(0), Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) + makeMapStatus("hostA", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 1) // should work because it's a new epoch, which will increase the number of available map @@ -1694,7 +1694,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(makeCompletionEvent( taskSet.tasks(1), Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) + makeMapStatus("hostA", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 2) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) @@ -2081,7 +2081,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler // should re-submit the stage with one task (the task that originally ran on HostA). assert(taskSets.size === 2) - assert(taskSets(1).tasks.size === 1) + assert(taskSets(1).tasks.length === 1) // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on @@ -2735,7 +2735,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // Now complete tasks in the second task set val newTaskSet = taskSets(1) // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA). - assert(newTaskSet.tasks.size === 2) + assert(newTaskSet.tasks.length === 2) // Complete task 0 from the original task set (i.e., not the one that's currently active). // This should still be counted towards the job being complete (but there's still one // outstanding task). @@ -2878,7 +2878,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // failed hostA, so both should be resubmitted. Complete them on hostB successfully. scheduler.resubmitFailedStages() assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 - && taskSets(2).tasks.size === 2) + && taskSets(2).tasks.length === 2) complete(taskSets(2), Seq( (Success, makeMapStatus("hostB", 2)), (Success, makeMapStatus("hostB", 2)))) @@ -2898,7 +2898,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt // is still running. assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 - && taskSets(3).tasks.size === 2) + && taskSets(3).tasks.length === 2) runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) @@ -2907,7 +2907,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // was ignored due to executor failure assert(taskSets.size === 5) assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2 - && taskSets(4).tasks.size === 1) + && taskSets(4).tasks.length === 1) // Complete task(stageId=1, stageAttempt=2, partitionId=1) successfully. runEvent(makeCompletionEvent( @@ -4445,7 +4445,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // a scenario where stage 0 needs to be resubmitted upon finishing all tasks. // Merge finalization should be scheduled in this case. for ((result, i) <- taskResults.zipWithIndex) { - if (i == taskSets(0).tasks.size - 1) { + if (i == taskSets(0).tasks.length - 1) { mapOutputTracker.removeOutputsOnHost("host0") } runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2)) @@ -4522,7 +4522,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // a scenario where stage 0 needs to be resubmitted upon finishing all tasks. // Merge finalization should be scheduled in this case. for ((result, i) <- taskResults.zipWithIndex) { - if (i == taskSets(0).tasks.size - 1) { + if (i == taskSets(0).tasks.length - 1) { mapOutputTracker.removeOutputsOnHost("host0") } runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2)) @@ -4986,7 +4986,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti * Note that this checks only the host and not the executor ID. */ private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]): Unit = { - assert(hosts.size === taskSet.tasks.size) + assert(hosts.size === taskSet.tasks.length) for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) { assert(taskLocs.map(_.host).toSet === expectedLocs.toSet) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index cf2240a0511..13e7ff758eb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -268,7 +268,7 @@ class MapStatusSuite extends SparkFunSuite { "number of skewed block sizes") val smallAndUntrackedBlocks = - nonEmptyBlocks.slice(0, nonEmptyBlocks.size - trackedSkewedBlocksLength) + nonEmptyBlocks.slice(0, nonEmptyBlocks.length - trackedSkewedBlocksLength) val avg = smallAndUntrackedBlocks.sum / smallAndUntrackedBlocks.length val loc = BlockManagerId("a", "b", 10) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 0533f9d7d8a..f1a4b97c298 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -143,14 +143,14 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val rdd = sc.parallelize(Seq(1), 1) sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully _, rdd.partitions.indices) - assert(tempDir.list().size === 1) + assert(tempDir.list().length === 1) } ignore("If commit fails, if task is retried it should not be locked, and will succeed.") { val rdd = sc.parallelize(Seq(1), 1) sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _, rdd.partitions.indices) - assert(tempDir.list().size === 1) + assert(tempDir.list().length === 1) } test("Job should not complete if all commits are denied") { @@ -161,13 +161,13 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { def resultHandler(x: Int, y: Unit): Unit = {} val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully, - 0 until rdd.partitions.size, resultHandler, ()) + 0 until rdd.partitions.length, resultHandler, ()) // It's an error if the job completes successfully even though no committer was authorized, // so throw an exception if the job was allowed to complete. intercept[TimeoutException] { ThreadUtils.awaitResult(futureAction, 5.seconds) } - assert(tempDir.list().size === 0) + assert(tempDir.list().length === 0) } test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f0ae7fc7411..2ab7df0d9cf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1815,10 +1815,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.size == 2) { + if (tDesc.resources(GPU).addresses.length == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.size == 1) { + if (tDesc.resources(GPU).addresses.length == 1) { has1Gpu += 1 } } @@ -1836,7 +1836,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.size) + assert(2 == taskDescriptions.head.resources(GPU).addresses.length) } test("Scheduler works with task resource profiles") { @@ -1875,10 +1875,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.size == 2) { + if (tDesc.resources(GPU).addresses.length == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.size == 1) { + if (tDesc.resources(GPU).addresses.length == 1) { has1Gpu += 1 } } @@ -1896,7 +1896,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.size) + assert(2 == taskDescriptions.head.resources(GPU).addresses.length) } test("Calculate available tasks slots for task resource profiles") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2fe50a486db..2f8b6df8bea 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -845,7 +845,7 @@ class TaskSetManagerSuite // multiple 1k result val r = sc.makeRDD(0 until 10, 10).map(genBytes(1024)).collect() - assert(10 === r.size) + assert(10 === r.length) // single 10M result val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()} @@ -863,7 +863,7 @@ class TaskSetManagerSuite sc = new SparkContext("local", "test", conf) // final result is below limit. val r = sc.makeRDD(0 until 2000, 2000).distinct(10).filter(_ == 0).collect() - assert(1 === r.size) + assert(1 === r.length) } test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala index 4acb4bbc779..25db9a5c686 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala @@ -48,7 +48,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex val shuffledRDD = cachedRDD.map { case (i, o) => (i * i * i - 10 * i * i, o)} // Join the two RDDs, and force evaluation - assert(shuffledRDD.join(cachedRDD).collect().size == 1) + assert(shuffledRDD.join(cachedRDD).collect().length == 1) } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 8a9537b4f18..a9ca9135f38 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -236,7 +236,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite { ShuffleMergedBlockId(shuffleId, shuffleMergeId, reduceId), dirs) assert(mergedBlockMeta.getNumChunks === 3) - assert(mergedBlockMeta.readChunkBitmaps().size === 3) + assert(mergedBlockMeta.readChunkBitmaps().length === 3) assert(mergedBlockMeta.readChunkBitmaps()(0).contains(1)) assert(mergedBlockMeta.readChunkBitmaps()(0).contains(2)) assert(!mergedBlockMeta.readChunkBitmaps()(0).contains(3)) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index be1b9be2d85..b6442246522 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -117,7 +117,7 @@ class DiskStoreSuite extends SparkFunSuite { val chunkedByteBuffer = blockData.toChunkedByteBuffer(ByteBuffer.allocate) val chunks = chunkedByteBuffer.chunks - assert(chunks.size === 2) + assert(chunks.length === 2) for (chunk <- chunks) { assert(chunk.limit() === 10 * 1024) } diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index c377f2495d0..35ef0587b9b 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -192,9 +192,9 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter { // verify whether the earliest file has been deleted val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted - logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + + logInfo(s"All rolled over files generated:${rolledOverFiles.length}\n" + rolledOverFiles.mkString("\n")) - assert(rolledOverFiles.size > 2) + assert(rolledOverFiles.length > 2) val earliestRolledOverFile = rolledOverFiles.head val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles( testFile.getParentFile.toString, testFile.getName).map(_.toString) diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala index 8aa4be6c2ff..82a4c85b02f 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala @@ -104,7 +104,7 @@ private object SizeTrackerSuite { * Run speed tests for size tracking collections. */ def main(args: Array[String]): Unit = { - if (args.size < 1) { + if (args.length < 1) { // scalastyle:off println println("Usage: SizeTrackerSuite [num elements]") // scalastyle:on println --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org