Liu Zhao created KYLIN-5271:
-------------------------------
Summary: Query memory leaks
Key: KYLIN-5271
URL: https://issues.apache.org/jira/browse/KYLIN-5271
Project: Kylin
Issue Type: Bug
Components: Query Engine
Affects Versions: v4.0.1
Reporter: Liu Zhao
The query thread will clone a SparkSession and put it into ThreadLocal.
However, if an exception occurs in the Calcite To SparkPlan, the SparkSession
in ThreadLocal will not be removed. More importantly, if the Spark restarts
later, the SparkSession left in ThreadLocal will be unavailable, and the query
on this thread will fail, throwing an exception: Caused by:
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
This stopped SparkContext was created at:
org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150)
java.lang.Thread.run(Thread.java:748)
// put SparkSession toThreadLocal
{code:java}
object SparderContextFacade extends Logging {
final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession,
UdfManager]] =
new InternalThreadLocal[Pair[SparkSession, UdfManager]]()
def current(): Pair[SparkSession, UdfManager] = {
if (CURRENT_SPARKSESSION.get() == null) {
val spark = SparderContext.getOriginalSparkSession.cloneSession()
CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
UdfManager.createWithoutBuildInFunc(spark)))
}
CURRENT_SPARKSESSION.get()
}
def remove(): Unit = {
CURRENT_SPARKSESSION.remove()
}
}
{code}
// remove SparkSession from ThreadLocal
// org.apache.kylin.query.runtime.plans.ResultPlan
{code:java}
def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType)
: Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) {
val result: Either[Enumerable[Array[Any]], Enumerable[Any]] =
resultType match {
case ResultType.NORMAL =>
if (SparderContext.needCompute()) {
Left(ResultPlan.collectEnumerable(df, rowType))
} else {
Left(Linq4j.asEnumerable(Array.empty[Array[Any]]))
}
case ResultType.SCALA =>
if (SparderContext.needCompute()) {
Right(ResultPlan.collectScalarEnumerable(df, rowType))
} else {
Right(Linq4j.asEnumerable(Lists.newArrayList[Any]()))
}
}
SparderContext.cleanQueryInfo()
SparderContext.closeThreadSparkSession()
result
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)