This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6f46ea2f9bb [SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2` 6f46ea2f9bb is described below commit 6f46ea2f9bbad71077f9f2dbf72fa4e6906ef29a Author: Jiaan Geng <belie...@163.com> AuthorDate: Sat Oct 14 11:22:40 2023 +0800 [SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2` ### What changes were proposed in this pull request? Since scala 2.13.0, `scala.runtime.Tuple2Zipped` marked as deprecated and `scala.collection.LazyZip2` recommended. ### Why are the changes needed? Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2` ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Exist test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43351 from beliefer/SPARK-45513. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> --- .../org/apache/spark/sql/test/SQLHelper.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 2 +- .../spark/shuffle/ShuffleBlockPusherSuite.scala | 6 +++--- .../mllib/feature/ElementwiseProductSuite.scala | 2 +- .../spark/mllib/feature/NormalizerSuite.scala | 6 +++--- .../spark/mllib/feature/StandardScalerSuite.scala | 24 +++++++++++----------- .../spark/mllib/optimization/LBFGSSuite.scala | 2 +- .../catalyst/optimizer/InjectRuntimeFilter.scala | 2 +- .../spark/sql/catalyst/plans/SQLHelper.scala | 2 +- .../columnar/compression/IntegralDeltaSuite.scala | 4 ++-- .../datasources/parquet/ParquetSchemaSuite.scala | 2 +- .../sql/execution/joins/HashedRelationSuite.scala | 24 ++++++++++++++++------ 12 files changed, 45 insertions(+), 33 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala index 12212492e37..727e2a4f420 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala @@ -41,7 +41,7 @@ trait SQLHelper { None } } - (keys, values).zipped.foreach { (k, v) => + keys.lazyZip(values).foreach { (k, v) => if (spark.conf.isModifiable(k)) { spark.conf.set(k, v) } else { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index edc54e60654..bd659363e53 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -385,7 +385,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit 8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L, 110L) def max(a: Array[Long], b: Array[Long]): Array[Long] = - (a, b).zipped.map(Math.max).toArray + a.lazyZip(b).map(Math.max).toArray // calculated metric peaks per stage per executor // metrics sent during stage 0 for each executor diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala index c8d89625dd8..18c27ff1269 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala @@ -82,7 +82,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { pushedBlocks ++= blocks val managedBuffers = invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]] val blockPushListener = invocation.getArguments()(4).asInstanceOf[BlockPushingListener] - (blocks, managedBuffers).zipped.foreach((blockId, buffer) => { + blocks.lazyZip(managedBuffers).foreach((blockId, buffer) => { blockPushListener.onBlockPushSuccess(blockId, buffer) }) }) @@ -91,7 +91,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { private def verifyPushRequests( pushRequests: Seq[PushRequest], expectedSizes: Seq[Int]): Unit = { - (pushRequests, expectedSizes).zipped.foreach((req, size) => { + pushRequests.lazyZip(expectedSizes).foreach((req, size) => { assert(req.size == size) }) } @@ -256,7 +256,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite { // blocks to be deferred blockPushListener.onBlockPushSuccess(blocks(0), managedBuffers(0)) } else { - (blocks, managedBuffers).zipped.foreach((blockId, buffer) => { + blocks.lazyZip(managedBuffers).foreach((blockId, buffer) => { blockPushListener.onBlockPushSuccess(blockId, buffer) }) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala index ccbf8a91cdd..9eca2d682d6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala @@ -54,7 +54,7 @@ class ElementwiseProductSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after hadamard product") - assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(data2(0) ~== Vectors.sparse(3, Seq((1, 0.0), (2, -1.5))) absTol 1E-5) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala index 10f7bafd6cf..71ce26360b8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala @@ -49,7 +49,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after normalization.") - assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(brzNorm(data1(0).asBreeze, 1) ~== 1.0 absTol 1E-5) assert(brzNorm(data1(2).asBreeze, 1) ~== 1.0 absTol 1E-5) @@ -76,7 +76,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after normalization.") - assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(brzNorm(data2(0).asBreeze, 2) ~== 1.0 absTol 1E-5) assert(brzNorm(data2(2).asBreeze, 2) ~== 1.0 absTol 1E-5) @@ -103,7 +103,7 @@ class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after normalization.") - assert((dataInf, dataInfRDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(dataInf.lazyZip(dataInfRDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(dataInf(0).toArray.map(math.abs).max ~== 1.0 absTol 1E-5) assert(dataInf(2).toArray.map(math.abs).max ~== 1.0 absTol 1E-5) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala index a5769631e51..a2c72de4231 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala @@ -105,9 +105,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after standardization.") - assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5) assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5) @@ -169,9 +169,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after standardization.") - assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5) assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5) @@ -225,9 +225,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after standardization.") - assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5) assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5) @@ -274,9 +274,9 @@ class StandardScalerSuite extends SparkFunSuite with MLlibTestSparkContext { case _ => false }, "The vector type should be preserved after standardization.") - assert((data1, data1RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data2, data2RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) - assert((data3, data3RDD.collect()).zipped.forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data1.lazyZip(data1RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data2.lazyZip(data2RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) + assert(data3.lazyZip(data3RDD.collect()).forall((v1, v2) => v1 ~== v2 absTol 1E-5)) assert(summary1.mean ~== Vectors.dense(0.0, 0.0, 0.0) absTol 1E-5) assert(summary1.variance ~== Vectors.dense(1.0, 1.0, 1.0) absTol 1E-5) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala index 27e21acc275..1318b23d28c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala @@ -71,7 +71,7 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers // Since the cost function is convex, the loss is guaranteed to be monotonically decreasing // with L-BFGS optimizer. // (SGD doesn't guarantee this, and the loss will be fluctuating in the optimization process.) - assert((loss, loss.tail).zipped.forall(_ > _), "loss should be monotonically decreasing.") + assert(loss.lazyZip(loss.tail).forall(_ > _), "loss should be monotonically decreasing.") val stepSize = 1.0 // Well, GD converges slower, so it requires more iterations! diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 13554908379..614ab4a1d01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -311,7 +311,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) => var newLeft = left var newRight = right - (leftKeys, rightKeys).zipped.foreach((l, r) => { + leftKeys.lazyZip(rightKeys).foreach((l, r) => { // Check if: // 1. There is already a DPP filter on the key // 2. There is already a runtime filter (Bloom filter or IN subquery) on the key diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala index 75fe61c4980..eb844e6f057 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala @@ -45,7 +45,7 @@ trait SQLHelper { None } } - (keys, values).zipped.foreach { (k, v) => + keys.lazyZip(values).foreach { (k, v) => if (SQLConf.isStaticConfigKey(k)) { throw new AnalysisException(s"Cannot modify the value of a static config: $k") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala index 655a9d7ff46..385c122e804 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -43,7 +43,7 @@ class IntegralDeltaSuite extends SparkFunSuite { val deltas = if (input.isEmpty) { Seq.empty[Long] } else { - (input.tail, input.init).zipped.map { + input.tail.lazyZip(input.init).map { case (x: Int, y: Int) => (x - y).toLong case (x: Long, y: Long) => x - y case other => fail(s"Unexpected input $other") @@ -80,7 +80,7 @@ class IntegralDeltaSuite extends SparkFunSuite { assertResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get()) assertResult(input.head, "The first value is wrong")(columnType.extract(buffer)) - (input.tail, deltas).zipped.foreach { (value, delta) => + input.tail.lazyZip(deltas).foreach { (value, delta) => if (math.abs(delta) <= Byte.MaxValue) { assertResult(delta, "Wrong delta")(buffer.get()) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index facc9b90ff7..ef06e64d2eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -968,7 +968,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { val fromCaseClassString = StructType.fromString(caseClassString) val fromJson = StructType.fromString(jsonString) - (fromCaseClassString, fromJson).zipped.foreach { (a, b) => + fromCaseClassString.lazyZip(fromJson).foreach { (a, b) => assert(a.name == b.name) assert(a.dataType === b.dataType) assert(a.nullable === b.nullable) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 686fb2d838b..69b07e6b6b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -467,7 +467,9 @@ class HashedRelationSuite extends SharedSparkSession { test("LongToUnsafeRowMap: key set iterator on a contiguous array of keys") { val rowMap = new LongToUnsafeRowMap(mm, 1) - (contiguousArray, contiguousRows).zipped.map { (i, row) => rowMap.append(i, row) } + contiguousArray.toArray.lazyZip(contiguousRows).map { + (i, row) => rowMap.append(i, row) + } var keyIterator = rowMap.keys() // in sparse mode the keys are unsorted assert(keyIterator.map(key => key.getLong(0)).toArray.sortWith(_ < _) === contiguousArray) @@ -479,7 +481,9 @@ class HashedRelationSuite extends SharedSparkSession { test("LongToUnsafeRowMap: key set iterator on a sparse array with equidistant keys") { val rowMap = new LongToUnsafeRowMap(mm, 1) - (sparseArray, sparseRows).zipped.map { (i, row) => rowMap.append(i, row) } + sparseArray.toArray.lazyZip(sparseRows).map { + (i, row) => rowMap.append(i, row) + } var keyIterator = rowMap.keys() assert(keyIterator.map(_.getLong(0)).toArray.sortWith(_ < _) === sparseArray) rowMap.optimize() @@ -503,7 +507,9 @@ class HashedRelationSuite extends SharedSparkSession { test("LongToUnsafeRowMap: multiple hasNext calls before calling next() on the key iterator") { val rowMap = new LongToUnsafeRowMap(mm, 1) - (randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) } + randomArray.toArray.lazyZip(randomRows).map { + (i, row) => rowMap.append(i, row) + } val buffer = new ArrayBuffer[Long]() // hasNext should not change the cursor unless the key was read by a next() call var keyIterator = rowMap.keys() @@ -527,7 +533,9 @@ class HashedRelationSuite extends SharedSparkSession { test("LongToUnsafeRowMap: no explicit hasNext calls on the key iterator") { val rowMap = new LongToUnsafeRowMap(mm, 1) - (randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) } + randomArray.toArray.lazyZip(randomRows).map { + (i, row) => rowMap.append(i, row) + } val buffer = new ArrayBuffer[Long]() // call next() until the buffer is filled with all keys var keyIterator = rowMap.keys() @@ -555,7 +563,9 @@ class HashedRelationSuite extends SharedSparkSession { test("LongToUnsafeRowMap: call hasNext at the end of the iterator") { val rowMap = new LongToUnsafeRowMap(mm, 1) - (sparseArray, sparseRows).zipped.map { (i, row) => rowMap.append(i, row) } + sparseArray.toArray.lazyZip(sparseRows).map { + (i, row) => rowMap.append(i, row) + } var keyIterator = rowMap.keys() assert(keyIterator.map(key => key.getLong(0)).toArray.sortWith(_ < _) === sparseArray) assert(keyIterator.hasNext == false) @@ -570,7 +580,9 @@ class HashedRelationSuite extends SharedSparkSession { test("LongToUnsafeRowMap: random sequence of hasNext and next() calls on the key iterator") { val rowMap = new LongToUnsafeRowMap(mm, 1) - (randomArray, randomRows).zipped.map { (i, row) => rowMap.append(i, row) } + randomArray.toArray.lazyZip(randomRows).map { + (i, row) => rowMap.append(i, row) + } val buffer = new ArrayBuffer[Long]() // call hasNext or next() at random var keyIterator = rowMap.keys() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org