This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5db7beb59a67 [SPARK-46455][CORE][SQL][SS][CONNECT][PYTHON] Remove redundant type conversion 5db7beb59a67 is described below commit 5db7beb59a673f05e8f39aa9653cb0497a6c97cf Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun Dec 24 14:44:14 2023 -0800 [SPARK-46455][CORE][SQL][SS][CONNECT][PYTHON] Remove redundant type conversion ### What changes were proposed in this pull request? This pr aims to clean up redundant type conversion in Spark production code. ### Why are the changes needed? Code clean up. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44412 from LuciferYang/cleanup-redundant-conversion. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 2 +- .../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala | 4 ++-- .../spark/sql/connect/execution/ExecuteResponseObserver.scala | 2 +- .../spark/sql/connect/service/SparkConnectExecutionManager.scala | 4 ++-- .../org/apache/spark/examples/streaming/KinesisWordCountASL.scala | 4 ++-- .../scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- core/src/main/scala/org/apache/spark/status/AppStatusStore.scala | 2 +- mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala | 2 +- sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala | 4 ++-- .../scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 2 +- .../org/apache/spark/sql/execution/adaptive/QueryStageExec.scala | 2 +- .../spark/sql/execution/command/AnalyzePartitionCommand.scala | 2 +- .../apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala | 2 +- .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 6 +++--- 18 files changed, 24 insertions(+), 24 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 29b9fdf9dfb9..9e10fac8bb55 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -327,7 +327,7 @@ private[sql] class AvroDeserializer( if (nonNullTypes.length == 1) { newWriter(nonNullTypes.head, catalystType, avroPath, catalystPath) } else { - nonNullTypes.map(_.getType).toSeq match { + nonNullTypes.map(_.getType) match { case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == LongType => (updater, ordinal, value) => value match { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 115cedfe1128..c9ceef969e29 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -158,7 +158,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( Long.MaxValue } else { val confSize = - SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong + SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION) if (confSize > 0) System.currentTimeMillis() + confSize else Long.MaxValue } @@ -167,7 +167,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( Long.MaxValue } else { val confSize = - SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE).toLong + SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE) if (confSize > 0) confSize else Long.MaxValue } var sentResponsesSize: Long = 0 diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index b5844486b73a..a7877503f461 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -102,7 +102,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: * value greater than 0 will buffer the response from getResponse. */ private val retryBufferSize = if (executeHolder.reattachable) { - SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE).toLong + SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE) } else { 0 } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index d8d9cee3dad4..c90f53ac07df 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -188,13 +188,13 @@ private[connect] class SparkConnectExecutionManager() extends Logging { scheduledExecutor match { case Some(_) => // Already running. case None => - val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL).toLong + val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL) logInfo(s"Starting thread for cleanup of abandoned executions every $interval ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { try { - val timeout = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT).toLong + val timeout = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT) periodicMaintenance(timeout) } catch { case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex) diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index 7d12af3256f1..4835e9de086c 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -230,9 +230,9 @@ object KinesisWordProducerASL { // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord for (i <- 1 to 10) { // Generate recordsPerSec records to put onto the stream - val records = (1 to recordsPerSecond.toInt).foreach { recordNum => + val records = (1 to recordsPerSecond).foreach { recordNum => // Randomly generate wordsPerRecord number of words - val data = (1 to wordsPerRecord.toInt).map(x => { + val data = (1 to wordsPerRecord).map(x => { // Get a random index to a word val randomWordIdx = Random.nextInt(randomWords.size) val randomWord = randomWords(randomWordIdx) diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index d0eee9c83c20..406c19be9bff 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -130,7 +130,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi val producer = getProducer(aggregate) val shardIdToSeqNumbers = producer.sendData(streamName, testData) logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") - shardIdToSeqNumbers.toMap + shardIdToSeqNumbers } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 7a9c0263631f..2d72b4dd6bf2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -104,7 +104,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi } val left = num - results.size - val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts)) val buf = new Array[Array[T]](p.size) self.context.setCallSite(callSite) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index fe10e140f82d..9518433a7f69 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1486,7 +1486,7 @@ abstract class RDD[T: ClassTag]( } } - val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts)) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) res.foreach(buf ++= _.take(num - buf.size)) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index c1c36d7a9f04..d50b8f935d56 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -371,7 +371,7 @@ private[spark] class AppStatusStore( Double.NaN } } - }.toIndexedSeq + } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala index 5e2b8943ed84..9e085c7078e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala @@ -37,7 +37,7 @@ object MLUtil { val destFSPath = new FSPath(destPath) val fs = destFSPath.getFileSystem(hadoopConf) - fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), destFSPath) + fs.copyFromLocalFile(false, true, new FSPath(localPath), destFSPath) } } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala index dcb80221b0e0..17be8cfa12b5 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -139,7 +139,7 @@ object Metadata { case (key, JInt(value)) => builder.putLong(key, value.toLong) case (key, JLong(value)) => - builder.putLong(key, value.toLong) + builder.putLong(key, value) case (key, JDouble(value)) => builder.putDouble(key, value) case (key, JBool(value)) => @@ -157,7 +157,7 @@ object Metadata { case _: JInt => builder.putLongArray(key, value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray) case _: JLong => - builder.putLongArray(key, value.asInstanceOf[List[JLong]].map(_.num.toLong).toArray) + builder.putLongArray(key, value.asInstanceOf[List[JLong]].map(_.num).toArray) case _: JDouble => builder.putDoubleArray(key, value.asInstanceOf[List[JDouble]].map(_.num).toArray) case _: JBool => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 3fd1fe04aed6..bb2b7e7ae066 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -105,7 +105,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat functionName: String, argumentName: String, candidates: Seq[String]): Throwable = { import org.apache.spark.sql.catalyst.util.StringUtils.orderSuggestedIdentifiersBySimilarity - val inputs = candidates.map(candidate => Seq(candidate)).toSeq + val inputs = candidates.map(candidate => Seq(candidate)) val recommendations = orderSuggestedIdentifiersBySimilarity(argumentName, inputs) .take(3) new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a3dc976647be..31e1495db7e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1435,7 +1435,7 @@ class Dataset[T] private[sql]( case s: Symbol => Column(s.name).expr case e: Expression => e case literal => Literal(literal) - }.toSeq + } UnresolvedHint(name, exprs, logicalPlan) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c65d1931dd1b..7bc770a0c9e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -518,7 +518,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } } - val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts)) val partsToScan = if (takeFromEnd) { // Reverse partitions to scan. So, if parts was [1, 2, 3] in 200 partitions (0 to 199), // it becomes [198, 197, 196]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 89e9de8b0843..88954d6f822d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -285,7 +285,7 @@ case class TableCacheQueryStageExec( sparkContext.submitJob( rdd, (_: Iterator[CachedBatch]) => (), - (0 until rdd.getNumPartitions).toSeq, + (0 until rdd.getNumPartitions), (_: Int, _: Unit) => (), () ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 98a851f19f05..4efd94e442e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -65,7 +65,7 @@ case class AnalyzePartitionCommand( if (filteredSpec.isEmpty) { None } else { - Some(filteredSpec.toMap) + Some(filteredSpec) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala index c4e12d5c4ae0..2cf299f87c89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -83,7 +83,7 @@ trait OrcFiltersBase { .groupBy(_._1.toLowerCase(Locale.ROOT)) .filter(_._2.size == 1) .transform((_, v) => v.head._2) - CaseInsensitiveMap(dedupPrimitiveFields.toMap) + CaseInsensitiveMap(dedupPrimitiveFields) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index c33a7c472842..101a9e6b9199 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -547,13 +547,13 @@ class RocksDB( readerMemUsage + memTableMemUsage + blockCacheUsage, pinnedBlocksMemUsage, totalSSTFilesBytes, - nativeOpsLatencyMicros.toMap, + nativeOpsLatencyMicros, commitLatencyMs, bytesCopied = fileManagerMetrics.bytesCopied, filesCopied = fileManagerMetrics.filesCopied, filesReused = fileManagerMetrics.filesReused, zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed, - nativeOpsMetrics = nativeOpsMetrics.toMap) + nativeOpsMetrics = nativeOpsMetrics) } /** @@ -861,7 +861,7 @@ object RocksDBConf { } def getStringConf(conf: ConfEntry): String = { - Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString } getOrElse { + Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default) } getOrElse { throw new IllegalArgumentException( s"Invalid value for '${conf.fullName}', must be a string" ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org