This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 94407429427 [SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused 94407429427 is described below commit 944074294277849f8bb920e8c368ef837c364fb1 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Thu Sep 29 07:37:21 2022 -0500 [SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused ### What changes were proposed in this pull request? This pr introduce local singletons for Jackson `ObjectMapper` that may be reused in Spark code to reduce the cost of repeatedly creating `ObjectMapper`. ### Why are the changes needed? Minor performance improvement. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GitHub Actions Closes #37999 from LuciferYang/SPARK-39146-2. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../org/apache/spark/ErrorClassesJSONReader.scala | 19 +++++++++++-------- .../spark/sql/catalyst/util/RebaseDateTime.scala | 8 ++++++-- .../execution/datasources/v2/DataSourceV2Utils.scala | 2 +- .../execution/datasources/v2/FileDataSourceV2.scala | 10 ++++++++-- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala b/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala index 9d6dd9dde07..e06fd1711d8 100644 --- a/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala +++ b/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala @@ -39,15 +39,9 @@ import org.apache.spark.annotation.DeveloperApi class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) { assert(jsonFileURLs.nonEmpty) - private def readAsMap(url: URL): SortedMap[String, ErrorInfo] = { - val mapper: JsonMapper = JsonMapper.builder() - .addModule(DefaultScalaModule) - .build() - mapper.readValue(url, new TypeReference[SortedMap[String, ErrorInfo]]() {}) - } - // Exposed for testing - private[spark] val errorInfoMap = jsonFileURLs.map(readAsMap).reduce(_ ++ _) + private[spark] val errorInfoMap = + jsonFileURLs.map(ErrorClassesJsonReader.readAsMap).reduce(_ ++ _) def getErrorMessage(errorClass: String, messageParameters: Map[String, String]): String = { val messageTemplate = getMessageTemplate(errorClass) @@ -88,6 +82,15 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) { } } +private object ErrorClassesJsonReader { + private val mapper: JsonMapper = JsonMapper.builder() + .addModule(DefaultScalaModule) + .build() + private def readAsMap(url: URL): SortedMap[String, ErrorInfo] = { + mapper.readValue(url, new TypeReference[SortedMap[String, ErrorInfo]]() {}) + } +} + /** * Information associated with an error class. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index dc1c4dbe677..a2a63e2af42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -268,13 +268,17 @@ object RebaseDateTime { micros + rebaseInfo.diffs(i) } + private lazy val mapper = { + val mapper = new ObjectMapper() with ClassTagExtensions + mapper.registerModule(DefaultScalaModule) + mapper + } + // Loads rebasing info from an JSON file. JSON records in the files should conform to // `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because // it is 2 times faster in DateTimeRebaseBenchmark. private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = { val file = Utils.getSparkClassLoader.getResource(fileName) - val mapper = new ObjectMapper() with ClassTagExtensions - mapper.registerModule(DefaultScalaModule) val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file) val anyRefMap = new AnyRefMap[String, RebaseInfo]((3 * jsonRebaseRecords.size) / 2) jsonRebaseRecords.foreach { jsonRecord => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index 7fd61c44fd1..f1d1cc5a173 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -150,6 +150,7 @@ private[sql] object DataSourceV2Utils extends Logging { } } + private lazy val objectMapper = new ObjectMapper() private def getOptionsWithPaths( extraOptions: CaseInsensitiveMap[String], paths: String*): CaseInsensitiveMap[String] = { @@ -158,7 +159,6 @@ private[sql] object DataSourceV2Utils extends Logging { } else if (paths.length == 1) { extraOptions + ("path" -> paths.head) } else { - val objectMapper = new ObjectMapper() extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index 08635d51172..0bd25064e35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -21,6 +21,7 @@ import java.util import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -50,9 +51,8 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { lazy val sparkSession = SparkSession.active protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = { - val objectMapper = new ObjectMapper() val paths = Option(map.get("paths")).map { pathStr => - objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq + FileDataSourceV2.readPathsToSeq(pathStr) }.getOrElse(Seq.empty) paths ++ Option(map.get("path")).toSeq } @@ -113,3 +113,9 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { } } } + +private object FileDataSourceV2 { + private lazy val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + private def readPathsToSeq(paths: String): Seq[String] = + objectMapper.readValue(paths, classOf[Seq[String]]) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org