This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
commit 6d4891b32cee585523a51a5304d6aa3c47bb8af8 Author: Herman van Hovell <her...@databricks.com> AuthorDate: Sun Aug 13 02:49:19 2023 +0200 Add classloader Id to code generation cache. --- .../expressions/codegen/CodeGenerator.scala | 40 ++++++++++++++-------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 8d10f6cd295..59688cae889 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ByteArrayInputStream +import java.util.UUID import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -25,6 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import com.google.common.cache.{CacheBuilder, CacheLoader} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} import org.codehaus.janino.ClassBodyEvaluator @@ -1439,7 +1441,7 @@ object CodeGenerator extends Logging { * @return a pair of a generated class and the bytecode statistics of generated functions. */ def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try { - cache.get(code) + cache.get((classLoaderUUID.get(Utils.getContextOrSparkClassLoader), code)) } catch { // Cache.get() may wrap the original exception. See the following URL // https://guava.dev/releases/14.0.1/api/docs/com/google/common/cache/ @@ -1581,20 +1583,30 @@ object CodeGenerator extends Logging { * aborted. See [[NonFateSharingCache]] for more details. */ private val cache = { - def loadFunc: CodeAndComment => (GeneratedClass, ByteCodeStats) = code => { - val startTime = System.nanoTime() - val result = doCompile(code) - val endTime = System.nanoTime() - val duration = endTime - startTime - val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS - CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) - CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) - logInfo(s"Code generated in $timeMs ms") - _compileTime.add(duration) - result + val loadFunc: ((ClassLoaderId, CodeAndComment)) => (GeneratedClass, ByteCodeStats) = { + case (_, code) => + val startTime = System.nanoTime() + val result = doCompile(code) + val endTime = System.nanoTime() + val duration = endTime - startTime + val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS + CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) + CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) + logInfo(s"Code generated in $timeMs ms") + _compileTime.add(duration) + result } - NonFateSharingCache[CodeAndComment, (GeneratedClass, ByteCodeStats)]( - loadFunc, SQLConf.get.codegenCacheMaxEntries) + NonFateSharingCache(loadFunc, SQLConf.get.codegenCacheMaxEntries) + } + + type ClassLoaderId = String + private val classLoaderUUID = { + NonFateSharingCache(CacheBuilder.newBuilder() + .weakKeys + .maximumSize(SQLConf.get.codegenCacheMaxEntries) + .build(new CacheLoader[ClassLoader, ClassLoaderId]() { + override def load(code: ClassLoader): ClassLoaderId = UUID.randomUUID.toString + })) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org